package com.gaodun.scrm.common.rabbitmq.config;

import com.alibaba.fastjson.JSON;
import com.gaodun.scrm.common.rabbitmq.constant.RabbitConst;
import com.gaodun.scrm.common.rabbitmq.entity.LuffyCorrelationData;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.concurrent.TimeUnit;

/**
 * @Description 消息发送确认
 * <p>
 * ConfirmCallback  只确认消息是否正确到达 Exchange 中
 * ReturnCallback   消息没有正确到达队列时触发回调，如果正确到达队列不执行
 * <p>
 * 1. 如果消息没有到exchange,则confirm回调,ack=false
 * 2. 如果消息到达exchange,则confirm回调,ack=true
 * 3. exchange到queue成功,则不回调return
 * 4. exchange到queue失败,则回调return
 */

@Component
@Slf4j
public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private RedisTemplate redisTemplate;

    //要注入rabbitTemplate
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    //交换机确认重试
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
//            log.info("消息发送成功：" + JSON.toJSONString(correlationData));
        } else {
            log.info("消息发送失败：" + cause + " 数据：" + JSON.toJSONString(correlationData));
            //重发
            LuffyCorrelationData luffyCorrelationData = (LuffyCorrelationData) correlationData;
            this.addRetry(luffyCorrelationData);
        }
    }

    private void addRetry(LuffyCorrelationData luffyCorrelationData) {
        try {
            int retryCount = luffyCorrelationData.getRetryCount();
            if (retryCount < RabbitConst.RETRY_COUNT) {
                luffyCorrelationData.setRetryCount(++retryCount);

                //更新缓存中重试次数
                redisTemplate.opsForValue().set(luffyCorrelationData.getId(),
                        JSON.toJSONString(luffyCorrelationData),
                        RabbitConst.OBJECT_TIMEOUT, TimeUnit.MINUTES);
                Thread.sleep(5000);

                rabbitTemplate.convertAndSend(luffyCorrelationData.getExchange(),
                        luffyCorrelationData.getRoutingKey(),
                        luffyCorrelationData.getMessage(),
                        luffyCorrelationData);
            } else {
                //重发次数用尽
                log.info("重发次数用尽:" + JSON.toJSONString(luffyCorrelationData));
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    //队列接收到消息确认
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("消息主体: " + new String(message.getBody()));
        System.out.println("应答码: " + replyCode);
        System.out.println("描述：" + replyText);
        System.out.println("消息使用的交换器 exchange : " + exchange);
        System.out.println("消息使用的路由键 routing : " + routingKey);

        String id = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
        String correlationData = (String) redisTemplate.opsForValue().get(id);
        this.addRetry(JSON.parseObject(correlationData, LuffyCorrelationData.class));
    }
}
